# -*- coding: UTF-8 -*-
import os
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
os.environ['JAVA_HOME'] = '/usr/java/jdk1.8.0_181-cloudera'
spark_session = SparkSession.builder.master("local[*]").appName("hive_test_1") \
    .config("hive.metastore.uris", "thrift://127.0.0.1:9083") \
    .enableHiveSupport().getOrCreate()
import time
def get_jd_sql():
    jdsql= """
select rcn_prod.t_jd.id , rcn_prod.t_jd.jd_content_id , rcn_prod.t_jd_content.title , rcn_prod.t_jd_content.company_name , rcn_prod.t_jd_content.work_age , rcn_prod.t_jd_content.workplaces , rcn_prod.t_jd_content.recruit_number , rcn_prod.t_jd_content.min_annual_salary ,  rcn_prod.t_jd_content.max_annual_salary  , rcn_prod.t_company.name as  t_companyname    , rcn_prod.t_delivery_project.degree , rcn_prod.t_delivery_project.work_age as workage   from rcn_prod.t_jd , rcn_prod.t_jd_content , rcn_prod.t_company , rcn_prod.t_delivery_project
 where rcn_prod.t_jd.jd_content_id  =  rcn_prod.t_jd_content.id  and rcn_prod.t_company.id  = rcn_prod.t_jd_content.company_id and rcn_prod.t_delivery_project.id  =  rcn_prod.t_jd.main_project_id 

    """
    return jdsql


def get_cv_sql():
    cvsql = """
    select 

     rcn_prod.t_talent_resume.id as id , rcn_prod.t_talent_resume.name , rcn_prod.t_talent_resume.date_of_birth , rcn_prod.t_talent_resume.gender , rcn_prod.t_talent_resume.age , rcn_prod.t_talent_resume.marital_status , rcn_prod.t_talent_resume.work_year , rcn_prod.t_talent_resume.current_company , rcn_prod.t_talent_resume.current_position , rcn_prod.t_talent_resume.current_salary   ,    rcn_prod.t_talent_resume.industry  , 
       rcn_prod.t_school.name as school_name , rcn_prod.t_school.id as school_id  ,  
      rcn_prod.t_major.name as major_name ,   rcn_prod.t_major.id as major_id , rcn_prod.t_work_status.name as status_name ,  rcn_prod.t_work_status.id as status_id ,  
     rcn_prod.t_talent_resume.created_at

    from rcn_prod.t_talent_resume , rcn_prod.t_school ,    rcn_prod.t_major , rcn_prod.t_work_status  
    WHERE 

     rcn_prod.t_school.id = rcn_prod.t_talent_resume.school_id AND
     rcn_prod.t_major.id = rcn_prod.t_talent_resume.major_id AND
     rcn_prod.t_work_status.id = rcn_prod.t_talent_resume.work_status_id  
    order by  rcn_prod.t_talent_resume.created_at DESC
 

    """
    return cvsql
def get_order_sql():
    orderSql = """

    (SELECT
    	id,
    	jd_id,
    	resume_id,1 as label
    	FROM
    	rcn_prod.t_delivery_order 
    	WHERE EXISTS ( SELECT 1 FROM rcn_prod.t_delivery_order_operation WHERE rcn_prod.t_delivery_order.id = rcn_prod.t_delivery_order_operation.order_id AND rcn_prod.t_delivery_order_operation.operation_type = 401 )
    	)UNION
    	(SELECT
    	id,
    	jd_id,
    	resume_id,0 as label
    	FROM
    	rcn_prod.t_delivery_order 
    	WHERE EXISTS ( SELECT 1 FROM rcn_prod.t_delivery_order_operation WHERE rcn_prod.t_delivery_order.id = rcn_prod.t_delivery_order_operation.order_id AND rcn_prod.t_delivery_order_operation.operation_type = 402 )
    	)
    """
    return orderSql

jdsql = get_jd_sql()
cv_sql = get_cv_sql()
order_sql = get_order_sql()
s = time.time()
def create_hive(dataframe,db,table):
    dataframe.write.format("hive").mode("overwrite").saveAsTable('{}.{}'.format(db, table))

jd = spark_session.sql(jdsql)
cv = spark_session.sql(cv_sql)
order = spark_session.sql(order_sql).withColumnRenamed('id','order_id')
jd = jd.withColumnRenamed('id','outer_jd_id')
create_hive(jd,'my_test','rcn_jd_data')
relation = order['jd_id']==jd['outer_jd_id']
first_join = order.join(jd,relation)
first_join.show(5)
print(cv.columns)
#input(1)
assert 'id' in cv.columns
cv = cv.withColumnRenamed('id','outer_resume_id')
create_hive(cv,'my_test','rcn_resume_data')
relation2 = first_join['resume_id']==cv['outer_resume_id']
print(first_join.columns)
# input('confirm first_join')
#
# print(cv.columns)
# input('confirm cv')
assert 'outer_resume_id' in cv.columns
second_join = first_join.join(cv,relation2)
second_join.show(5)
create_hive(second_join,'my_test','matching_training_data')
print('cv', cv.count())
print(time.time() - s)

#cv.toPandas().to_csv('alljd_hive.csv')